package com.example.emos.wx.message.task;

import com.example.emos.wx.core.exception.EmosException;
import com.example.emos.wx.message.model.pojo.MessageEntity;
import com.example.emos.wx.message.model.pojo.MessageRefEntity;
import com.example.emos.wx.message.service.MessageService;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author YinXi
 * @Versin 1.0.0
 * @Date 2023/9/12
 */
@Component
@Slf4j
public class MessageTask {
    @Autowired
    private ConnectionFactory factory;
    @Autowired
    private MessageService messageService;

    /**
     * 发送消息
     * @param topic 队列的名称
     * @param entity 消息集合
     */
    public void send(String topic, MessageEntity entity){
        String id = messageService.insert(entity);
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ) {
            //连接到topic队列
            channel.queueDeclare(topic,true,false,false,null);
            //往其中放入messageId
            HashMap map = new HashMap();
            map.put("messageId",id);
            //把messageId放入到AMQP协议的请求头中
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(map).build();
            //使用 channel发送消息，类比于JDBC中的statement
            channel.basicPublish("",topic,properties,entity.getMsg().getBytes());
            log.debug("消息发送成功");
        }catch (Exception e){
            log.error("执行异常",e);
            throw new EmosException("消息系统异常请稍后再试");
        }
    }
    /**
     * 异步发送消息
     */
    @Async
    public void sendAsync(String topic,MessageEntity entity){
        send(topic,entity);
    }

    /**
     * 接收消息
     */
    public int receive(String topic){
        //接收到的消息数量
        int i=0;
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ) {
            //连接到topic队列
            channel.queueDeclare(topic,true,false,false,null);
            //循环从topic中接收消息
            while (true){
                GetResponse response = channel.basicGet(topic, false);
                if(response!=null){
                    AMQP.BasicProperties props = response.getProps();
                    //获取请求头的数据
                    Map<String,Object> map = props.getHeaders();
                    String messageId = map.get("messageId").toString();
                    byte[] body = response.getBody();
                    String msg = new String(body);
                    log.debug("从RabbitMQ接收的消息"+msg);
                    MessageRefEntity entity = new MessageRefEntity();
                    entity.setMessageId(messageId);
                    entity.setReceiverId(Integer.parseInt(topic));
                    entity.setReadFlag(false);
                    entity.setLastFlag(true);
                    messageService.insert(entity);
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag,false);
                    i++;
                }else {
                    break;
                }
            }
        }catch (Exception e){
            log.error("执行异常",e);
            throw new EmosException("消息系统异常请稍后再试");
        }
        return i;
    }
    /**
     * 异步接收消息
     */
    @Async
    public int receiveAsync(String topic){
        return receive(topic);
    }
    /**
     * 删除一个消息队列
     */
    public void deleteQueue(String topic){
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ) {
            channel.queueDelete(topic);
            log.debug("消息队列成功删除");
        }catch (Exception e){
            log.error("执行异常",e);
            throw new EmosException("消息系统异常请稍后再试");
        }
    }
    /**
     * 异步执行删除消息队列
     */
    @Async
    public void deleteQueueAsync(String topic){
        deleteQueue(topic);
    }
}
